package mnet

/*
 * net connection.
 */
import (
	"errors"
	"io"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

// //////////////The following variables can be modified/////////////////////////
// gWriteBufferMaxSize is the size of the write buffer.
var gWriteBufferMaxSize int = 10 * 1024

// gReadBufferMaxSize is the size of the read buffer.
var gReadBufferMaxSize int = 10 * 1024

//////////////////////////////////////////////////////////////////////////////////

const (
	// gWriteGoroutineSleepTime decides the sleep time
	// of the loopWriter at that case of no message in the outChan.
	gWriteGoroutineSleepTime = 10 * time.Millisecond

	// send packet time out time.
	gSendPacketTimeoutTime = 50 * time.Millisecond

	// output chan size
	gOutPutChanSize = 2 * 1024

	// max read message data time out
	gReadMessageTimeout = 5 * time.Second

	// read time out flag, 0 is just ignore, else set timeout time to wait next stream.
	gReadTimeoutFlag int = 0

	// the MIN sending packet size
	gMinPacketSize = 2 * 1024
)

// These variables are the indicator of the isClosed in mConnection.
const (
	// close flag.
	gClosedFlag int32 = iota
	// open flag.
	gOpenFlag
)

// TCP connection struct.
type mConnection struct {
	net          INet                   // net module
	conn         *net.TCPConn           // TCP conn
	connID       uint32                 // Connection id
	isClosed     int32                  // Closed state, is closed is 1, else 0.
	doClose      int32                  // Do close state, 0 is not close and 1 is closed.
	closeOnce    sync.Once              // Once close
	outChan      chan []byte            // output message chan
	property     map[string]interface{} // Connection property
	propertyLock sync.RWMutex           // Property read and write locker.
	session      ISession               // User connection session
	codec        ICodec                 // Packet codec
}

// newConnection creates a mConnection instance and return it.
func newConnection(net INet, conn *net.TCPConn,
	connId uint32, s ISession, codec ICodec,
) *mConnection {
	if s == nil {
		panic("mnet: session is nil")
	}

	if codec == nil {
		panic("mnet: packet parser is nil")
	}

	return &mConnection{
		net:      net,
		conn:     conn,
		connID:   connId,
		isClosed: gClosedFlag,
		outChan:  make(chan []byte, gOutPutChanSize),
		property: make(map[string]interface{}),
		session:  s,
		codec:    codec,
		doClose:  0,
	}
}

// SetLinger sets connection linger property.
func (mc *mConnection) SetLinger(sec int) error {
	if mc.conn != nil {
		return mc.conn.SetLinger(sec)
	} else {
		return nil
	}
}

// SyncSend sends message synchronously.
func (mc *mConnection) SyncSend(data []byte) error {
	if data == nil {
		return errors.New("invalid nil message data")
	}
	if mc.IsClosed() {
		return errors.New("[net] Connection is closed")
	}
	return mc.syncSend(data)
}
func (mc *mConnection) syncSend(data []byte) error {
	if len, err := mc.conn.Write(data); err != nil {
		log().Error("[net] send message error: ", err, ", write len:", len)
		return err
	} else {
		return nil
	}
}

/*
 * Send message only if
 * 1, len(message）>= 1k
 * 2, time out for 100ms,
 * 3, extraMsg is not nil,
 * 4, connection is exit.
 */
func (mc *mConnection) loopWriter() {
	defer mc.onWriterThreadExit()

	// write buffer
	writeBuffer := make([]byte, gWriteBufferMaxSize)

	// exit flag.
	var exitFlag bool = false

	// extra message buffer for the data that writeBuff can not hold.
	var extraMsg []byte = nil

	// all send packet size.
	var allLen int = 0

	// last sending time
	var lastSendTime time.Time = time.Now()

	// busy flag.
	var busy bool
	for {
		busy = false
		for {
			select {
			case msg := <-mc.outChan:
				if msg == nil {
					// exit signal
					exitFlag = true
					goto sendPath
				}

				msgLen := len(msg)
				if allLen+msgLen > gWriteBufferMaxSize {
					extraMsg = msg
					busy = true
					goto sendPath
				} else {
					copy(writeBuffer[allLen:], msg)
					allLen += msgLen
				}

			default:
				busy = false
				goto sendPath
			}
		}

	sendPath:
		// send if (exitFlag or message >= gMinPacketSize or time out 100ms or extraMsg != nil)
		if exitFlag || extraMsg != nil ||
			(allLen >= gMinPacketSize) || (allLen > 0 && time.Since(lastSendTime) >= gSendPacketTimeoutTime) {
			// send writeBuff message.
			if err := mc.syncSend(writeBuffer[0:allLen]); err != nil {
				// try to put extraMsg buffer.
				if extraMsg != nil {
					extraMsg = nil
				}
				return
			}

			// reset
			allLen = 0
			lastSendTime = time.Now()
		}

		// send extra msg if it is not nil.
		if extraMsg != nil {
			if err := mc.syncSend(extraMsg); err != nil {
				return
			}

			// reset
			extraMsg = nil
		}

		// do exit now.
		if exitFlag {
			return
		}

		// sleep for gWriteGoroutineSleepTime time.
		if !busy {
			time.Sleep(gWriteGoroutineSleepTime)
		}
	}
}

// loopReader read message for ever.
func (mc *mConnection) loopReader() {
	defer mc.onReaderThreadExit()

	// read buffer.
	headSize := mc.codec.GetHeadLen()
	readBuffer := make([]byte, gReadBufferMaxSize)
	for {
		if _, err := io.ReadFull(mc.conn, readBuffer[:headSize]); err != nil {
			if err != io.EOF {
				log().Error("[net] io.ReadFull error:", err)
			}
			return
		}

		// Parse head to Message Length
		dataLen, err := mc.codec.PacketParse(readBuffer[:headSize])
		if err != nil {
			log().Error("[net] parse packet error:", err)
			return
		}

		if gReadTimeoutFlag > 0 {
			// Start to read message, just set reading time out.
			if err := mc.conn.SetReadDeadline(time.Now().Add(gReadMessageTimeout)); err != nil {
				log().Error("[net] SetReadDeadline error:", err)
				return
			}
		}

		// Try to read data buffer or timeout if possible.
		if _, err := io.ReadFull(mc.conn, readBuffer[headSize:headSize+dataLen]); err != nil {
			log().Error("[net] Read message data error:", err)
			return
		}

		if gReadTimeoutFlag > 0 {
			// Clear SetReadDeadline flag
			if err := mc.conn.SetReadDeadline(time.Time{}); err != nil {
				log().Error("[net] reset SetReadDeadline error:", err)
				return
			}
		}

		// Receiving message event.
		// !!You should copy the data if you deal the data in a different goroutine.
		mc.OnRecvEvent(readBuffer[:headSize+dataLen])
	}
}

// OnDestroyEvent closes event
func (mc *mConnection) OnDestroyEvent() {
	if mc.session != nil {
		mc.session.OnTerminate()
	}
}

// OnRecvEvent receives data callback
func (mc *mConnection) OnRecvEvent(msg []byte) {
	if mc.session != nil {
		mc.session.OnRecv(msg)
	}
}

// OnConnectEvent establish event callback.
func (mc *mConnection) OnConnectEvent() {
	if mc.session != nil {
		mc.session.SetConnection(mc)
		mc.session.OnEstablish()
	}
}

// start connection reading and writing.
func (mc *mConnection) start() {
	// set no delay
	mc.conn.SetNoDelay(true)

	// set keep alive.
	mc.conn.SetKeepAlive(true)

	// connection is established.
	atomic.StoreInt32(&mc.isClosed, gOpenFlag)

	// connection event callback.
	mc.OnConnectEvent()

	// start a goroutine to read data.
	go mc.loopReader()

	// start a goroutine to write data.
	go mc.loopWriter()
}

// try to set isClose flag == 1, isClose == 1 return true, else return false
func (mc *mConnection) tryStop() bool {
	return !atomic.CompareAndSwapInt32(&mc.isClosed, gOpenFlag, gClosedFlag)
}

// try to stop the read/write goroutines.
func (mc *mConnection) stop() {
	if !mc.tryStop() {
		// close event callback.
		mc.OnDestroyEvent()

		// let write goroutine exit.
		mc.setWriterThreadExit()
	} else {
		// exit.
		mc.doExit()
	}
}

// set write thread exit, just send all messages.
func (mc *mConnection) setWriterThreadExit() {
	mc.outChan <- nil
}

// writer thread exit callback
func (mc *mConnection) onWriterThreadExit() {
	if mc.tryStop() {
		mc.doExit()
	} else {
		// close event callback.
		mc.OnDestroyEvent()

		// Set read goroutine to exit.
		mc.Close()
	}
}

// reader thread exit callback
func (mc *mConnection) onReaderThreadExit() {
	mc.stop()
}

// do exit callback
func (mc *mConnection) doExit() {
	mc.closeOnce.Do(func() {
		// close net connection
		mc.conn.Close()

		// set do close flag
		atomic.StoreInt32(&mc.doClose, 1)

		// remove from connection manager.
		mc.net.getConnMgr().remove(mc)
	})
}

func (mc *mConnection) GetConnID() uint32 {
	return mc.connID
}

func (mc *mConnection) RemoteAddr() net.Addr {
	return mc.conn.RemoteAddr()
}

func (c *mConnection) LocalAddr() net.Addr {
	return c.conn.LocalAddr()
}

// Send binary message
func (mc *mConnection) SendMsg(data []byte) error {
	if data == nil {
		return errors.New("send nil message")
	}
	if mc.IsClosed() {
		return nil
	}

	// Try to send the msg. If channel is full, just try fullCount count, and the last leftCount will sleep sleepTime time.
	var count int = 0
	const fullCount int = 20
	const leftCount int = 3
	const sleepTime time.Duration = 2 * time.Millisecond
	for {
		select {
		case mc.outChan <- data:
			return nil
		default:
			if mc.IsClosed() {
				return nil
			} else {
				count++
				if count >= fullCount {
					return errors.New("[net] send channel is full")
				} else if count > fullCount-leftCount {
					time.Sleep(sleepTime)
				}
			}
		}
	}
}

// IsClosed whether it is closed.
func (mc *mConnection) IsClosed() bool {
	return atomic.LoadInt32(&mc.isClosed) == gClosedFlag
}

// send binary message with time out.
func (mc *mConnection) SendMsgTimeout(data []byte, sec int) error {
	if data == nil {
		return errors.New("[net] invalid nil message data")
	}

	if sec < 0 {
		sec = 0
	}

	if mc.IsClosed() {
		return nil
	}

	// add to outChan for time out.
	select {
	case mc.outChan <- data:
		return nil
	case <-time.After(time.Duration(sec) * time.Second):
		return errors.New("[net] time out")
	}
}

// Set property.
func (mc *mConnection) SetProperty(key string, value interface{}) {
	mc.propertyLock.Lock()
	defer mc.propertyLock.Unlock()
	mc.property[key] = value
}

// Get property
func (mc *mConnection) GetProperty(key string) interface{} {
	mc.propertyLock.RLock()
	defer mc.propertyLock.RUnlock()
	if value, ok := mc.property[key]; ok {
		return value
	} else {
		return nil
	}
}

// Remove property
func (mc *mConnection) RemoveProperty(key string) {
	mc.propertyLock.Lock()
	defer mc.propertyLock.Unlock()
	delete(mc.property, key)
}

// Get connection session
func (mc *mConnection) GetSession() ISession {
	return mc.session
}

// Close connection,let read goroutine exit
func (mc *mConnection) Close() {
	mc.conn.SetReadDeadline(time.Now())
}

// Wait the connection close.
func (mc *mConnection) Wait() bool {
	return atomic.LoadInt32(&mc.doClose) == 1
}
